by Alex Coy (ac2456) and Jason Russo (jr826). A product for ECE 5725's final project.
This project aims to point a camera at a lecturer while also maintaining the ability to control the camera manually, record video to a local destination, or stream over a network.
Starting on roughly April 1st, 2020, the team drew plans for a simple
pan/tilt camera platform in OpenSCAD. Coy printed the parts on a pair
of inexpensive 3D printers and assembled them, drawing more as
necessary. The team developed software to control the pan/tilt platform,
and then added custom network control software. The team was able to
start to explore person recognition, but could not learn enough in time.
Check out our demonstration video here!
In this project, the team constructed their own Pan-Tilt-Zoom (PTZ) camera. PTZ cameras are noted for their ability to be controlled remotely. They can 'pan' left or right, 'tilt' up or down, and 'zoom' in or out. These three functionalities allow for PTZ cameras to cover a wide area. This is key for the ClassCam+'s main application of tracking an individual as they move about a room.
This project also utilizes OpenCV, the "Open Source Computer Vision Library". This open source computer vision and machine learning library provides facial detection and recognition, object identification, and person detection. The ClassCam+ aims to utilizes OpenCV's person detection capabilities to track and individual as they move about the room.
The following flowchart depicts the flow of information throughout the system. The client sends camera control commands to the Raspberry Pi. The RPi interprets these commands and moves them to the platform, which performs the movement. This, in turn, moves the camera. All the while, the Raspberry Pi is receiving the camera feed and streaming it out to all of the clients.
Every piece of the custom pan/tilt platform was drawn in OpenSCAD with the idea that the parts should be mostly 3D-printable, as that was the primary constraint on making custom parts. As a result, the team used an assembly file to keep track of the key structural parts.
See the source repository for the openSCAD files used plus needed openSCAD libraries. We chose stepper motors because they were already available in the home lab, and because they are better suited to this design due to their robustness at a low cost. We chose to control the stepper motors using an Arduino over controlling them with the Pi because it is very hard to make the Pi perform any sort of true realtime operation. Using the Arduino as a motor coprocessor allows the Pi to apply minimal computing power towards controlling the motors. Lastly, we chose infrared optical limit switches consisting of an IR LED and a reverse-biased IR photodiode because the home lab did not have any traditional limit switches.
The client connects to the server by running client.py. The client code creates a thread to connect to the server and receive the camera feed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class myVC(): def __init__(self): context = zmq.Context() self.socket = context.socket(zmq.SUB) self.socket.connect('tcp://alexcoy.duckdns.org:5002') self.socket.setsockopt_string(zmq.SUBSCRIBE, np.unicode('')) def getFrame(self): image_string = self.socket.recv_string() raw_image = base64.b64decode(image_string) image = np.frombuffer(raw_image, dtype=np.uint8) frame = cv2.imdecode(image, 1) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return frame |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class SerialApp(Frame): def __init__(self, master=None): super().__init__(master) self.master = master self.pack() self.initWidgets() self.errorMsg = False self.vid = myVC() self.canvas = Canvas(self, width=640, height=480) self.canvas.pack() self.outgoing = makeCommandBytes(Commands.CMD_GETCOORD, 25, 30) self.after(1, self.paintCanvas) |
We created a custom Detection class as shown below that utilizes OpenCV to receive frames and process them using OpenCV's HOG (Histogram of Oriented Gradients) module and the defaultPeopleDetector.
1 2 3 4 5 6 7 8 9 10 11 |
class Detection(threading.Thread): def __init__(self): # init HOG descriptor/person detection. HOG is Histogram of # Oriented Gradients. It is used for image detection. In our case # It will be used for people detection. threading.Thread.__init__(self) self.hog = cv2.HOGDescriptor() self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector()) self.frame = False self.newFrame = False self.boxes = np.array([]) |
The Detection class runs the following detect function on a given frame to search for people within the frame. The frame is converted to grayscale in order to speed up the image processing. Then a HOG detection function is called and returns bounding boxes for the detected people. The boxes are formatted as numpy arrays and are returned to be displayed on screen.
1 2 3 4 5 6 7 8 9 |
def detect(self, frame): # use greyscale for faster detection gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) # detect people in the image # returns the bounding boxes for the detected objects boxes, weights = self.hog.detectMultiScale(gray, winStride=(8, 8)) # print(boxes) boxes = np.array([[x, y, x+w, y+y] for (x, y, w, h) in boxes]) return boxes |
The AutoUpdate function is used when the camera is in auto mode in order to move the camera and follow along with the person on screen. This function evaluates the boxes stored in self.boxes (updated using the detect function). This function computes the midpoint of the bounding boxes on screen and compares it to the center of the camera. If the bounding boxes are beyond a defined threshold in any direction, a command is sent to the camera to move on step in that direction. The threshold exists to keep the system from oscillating about that point. We also choose to move in steps instead of simply inputting the midpoint and moving there because this could lead to the camera jumping wildly about the screen and moving too fast.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
def AutoUpdate(self, thePTZ): # average values avg_x = [] avg_y = [] new_x = 0 new_y = 0 if (self.boxes.size == 0): return # loop through boxes for i in self.boxes: # average x coordinates avg_x.append((i[0] + i[2]) / 2) avg_y.append((i[1] + i[3]) / 2) for x in avg_x: new_x = new_x + x if (len(avg_x) > 0): new_x = new_x / len(avg_x) for y in avg_y: new_y = new_y + y if (len(avg_y) > 0): new_x = new_y / len(avg_y) framex = self.frame.shape[1] framey = self.frame.shape[0] #print(f"Frame is {framex:5d} x {framey:5d}") # compute difference between camera center and bounding box center xdif = framex/2 - new_x ydif = framey/2 - new_y |
Another option that was explored for person detection was utilizing face detection instead of full body detection. This idea was inspired by the Automated Lamp project from this very course in Fall 2019. Unfortunately, the team did not have enough time to fully explore this option or to fully refine the person detection to provide more conistent results.
A significant amount of system testing was able to be done by the team connecting to the system remotely. Coy was able to run the server at his house while Russo would connect from his house. Russo could then send commands to the camera and both teammates could observe the response from the system. In the case of people detection, Coy was able to position himself in front of the camera and the team could observe as the system attempted to track Coy's movement. A camera system such as this one is hard to write up a test suite for, and so most functionality testing had to be done by executing the functions and observing the response.
In our code repository, there are remnants of a web application. The initial idea for this project was for it to be something that any device with a web browser could use. As such, a simple php application, runnable on the Pi, exists. The main snag that we ran into was for creating a live stream. It turns out, trying to encode browser-streamable video on-the-fly is hard. There is no tool available that can take the hardware-compressed h.264 stream from the Pi's VideoCore IV GPU, remux it into a fragmented format, and segment the fragments in one pass. We experimented with a two-pass solution, but ultimately scrapped it because of astronomical latency.
The team was able to achieve most of the original project goals. By the end, the team had constructed a camera streaming system that could be streamed to multiple viewers at the same time and controlled remotely by one viewer. The controller could manually control the camera as well as enter an 'auto' mode that would track a person as they moved about the room. The shortcomings of this project fell in the person detection. The system does not reliable detect a person on the screen, leading to the camera sometimes not following the individiual as they move about, or even sometimes detecting an inanimate object as a person and moving to that instead. The camera feed is reliable, however, and is able to viewed remotely with little latency. The camera can also respond to remote commands with little latency as well. The team was able to construct an efficent and reliable network setup for the ClassCam+, a pivotal part of the design of the project.
Overall, the project was a success. The team was able to continue working despite the setbacks caused by COVID-19 the move off campus. The team was able to explore different options for the project, even if they all didn't fully pan out (such as the browser view and facial detection as opposed to full body detection). The team faced some additional setbacks with OpenCV setup, but was able to move past this as well. In the end, the team was running the OpenCV defaultPeopleDetector for recognizing a person. Although it did not always produce reliable results, the team made good progress and sees this an area for future improvement.
The team sees multiple areas of future improvement for the project. Perhaps most importantly, the team would work to make the person detection more reliable, or even switch to another method of detection (such as facial detection, as mentioned earlier). This would better help the ClassCam+ reach it's desired state of a fully automated lecture streaming system.
Another area of improvement is the saving of lecture videos. As of now, video is only streamed and is then lost. With more time, the team would explore means of saving the video, giving students and professors recordings of the lectures for future reference. This was an idea that was orginally considered for the project, but was not able to be fully explored due to time constraints.
Lastly, the team would consider different ways of making the stream more accessible. This was the original intent of the browser streaming, but this feature was unable to be implemented given the design of the system. To get this feature working, the team may have to make significant changes to the system structure. With enough time, however, this may be a worthwhile endeavor.
Item | Cost | Item | Cost |
28BYJ-48 Stepper Motor/Driver Board 2 of 5 | $4.80 | Arducam OV5647 Camera with CS Mount Lens | $30 |
0.75kg Black 3D Printer Filament at most | $15 | Elegoo Uno R3 | $13 |
IR LEDs and Photodiodes | $0.50 | Jumper and normal wire | $1 |
M3 Screws and Standoffs | $1 | Total Cost | $65.30 |
Russo focused primarily on OpenCV. He worked to set up the person detecting and the camera response to track the person as they move about the room. He also wrote the video tracking script and contributed to the Design, Results, Conclusion, and Future Work sections of the website.
Coy focused primarily on the network, responsiveness, and physical portions of the project. He wrote and/or reviewed the ``guts'' of anything involving networking or physical hardware, such as the video networking code (which was taken from an unnoted example of a pub/sub model for OpenCV frame sharing; sorry!) and the PTZ abstraction (completely his own, built from reading library documentation).
This project features a rather large code base. The entire code base can be found on this Github Repo: PTZ Project Repo Below are a few of the key files used in this project.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
#! /usr/bin/env python3 from tkinter import * from tkinter.ttk import * from ptz_controller import Commands, makeCommandBytes import socket import cv2 import zmq import base64 import numpy as np import PIL.Image import PIL.ImageTk jDist = 768 class myVC(): def __init__(self): context = zmq.Context() self.socket = context.socket(zmq.SUB) self.socket.connect('tcp://alexcoy.duckdns.org:5002') self.socket.setsockopt_string(zmq.SUBSCRIBE, np.unicode('')) def getFrame(self): image_string = self.socket.recv_string() raw_image = base64.b64decode(image_string) image = np.frombuffer(raw_image, dtype=np.uint8) frame = cv2.imdecode(image, 1) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return frame class SerialApp(Frame): def __init__(self, master=None): super().__init__(master) self.master = master self.pack() self.initWidgets() self.errorMsg = False self.vid = myVC() self.canvas = Canvas(self, width=640, height=480) self.canvas.pack() self.outgoing = makeCommandBytes(Commands.CMD_GETCOORD, 25, 30) self.after(15, self.paintCanvas) def paintCanvas(self): frame = self.vid.getFrame() self.photo = PIL.ImageTk.PhotoImage(image=PIL.Image.fromarray(frame)) self.canvas.create_image(0, 0, image=self.photo, anchor=NW) self.after(15, self.paintCanvas) def initWidgets(self): self.bottomFrame = Frame(self) self.bottomFrame.pack(side="bottom") self.startButton = Button(self.bottomFrame) self.startButton["text"] = "Connect" self.startButton["command"] = self.try_connect self.aChoice = Entry(self.bottomFrame) self.aChoice["width"] = 20 self.aLabel = Label(self.bottomFrame) self.aLabel["text"] = "Connect to: " self.startButton.pack(side="left") self.aLabel.pack(side="left") self.aChoice.pack(side="left") def homeOrJog(self): self.startButton["text"] = "Home" self.startButton["command"] = (self.doHome) self.midFrame = Frame(self) self.xyGroup = Frame(self.midFrame) self.xGroup = Frame(self.xyGroup) self.yGroup = Frame(self.xyGroup) self.midFrame.pack(side="top") self.goButton = Button(self.midFrame) self.goButton["text"] = "Go" self.goButton["width"] = 2 self.goButton["command"] = self.doSet self.panCCWButton = Button(self.midFrame) self.panCCWButton["text"] = "L" self.panCCWButton["width"] = 2 self.panCCWButton["command"] = self.panCCW self.panCWButton = Button(self.midFrame) self.panCWButton["text"] = "R" self.panCWButton["width"] = 2 self.panCWButton["command"] = self.panCW self.tiltCCWButton = Button(self.midFrame) self.tiltCCWButton["text"] = "D" self.tiltCCWButton["width"] = 2 self.tiltCCWButton["command"] = self.tiltCCW self.tiltCWButton = Button(self.midFrame) self.tiltCWButton["text"] = "U" self.tiltCWButton["width"] = 2 self.tiltCWButton["command"] = self.tiltCW self.autoButton = Button(self.midFrame) self.autoButton["text"] = "auto" self.autoButton["width"] = 6 self.autoButton["command"] = self.auto self.xEntry = Entry(self.xGroup) self.xEntry["width"] = 5 self.xLabel = Label(self.xGroup) self.xLabel["text"] = "Pan: " self.xEntry.pack(side="right") self.xEntry.pack(side="left") self.yEntry = Entry(self.yGroup) self.yEntry["width"] = 5 self.yLabel = Label(self.yGroup) self.yLabel["text"] = "Tilt: " self.yEntry.pack(side="right") self.yEntry.pack(side="left") self.xGroup.pack(side="left") self.yGroup.pack(side="right") self.xyGroup.pack(side="left") self.autoButton.pack(side="right") self.goButton.pack(side="right") self.panCWButton.pack(side="right") self.panCCWButton.pack(side="right") self.tiltCWButton.pack(side="right") self.tiltCCWButton.pack(side="right") self.midFrame.pack() self.bottomFrame.pack() def try_connect(self): print("Trying to connect to " + self.aChoice.get()) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket.setdefaulttimeout(0.25) try: self.sock.connect((self.aChoice.get(), 5000)) self.after(250, self.heartbeat) self.actualMessage = False self.processingSet = False self.homeOrJog() except: if (self.errorMsg): self.errorMsg.pack_forget() self.errorMsg = Label(self) self.errorMsg["text"] = "Couldn't connect" self.errorMsg.pack() def heartbeat(self): if (self.actualMessage): self.actualMessage = False #self.outgoing = makeCommandBytes(Commands.CMD_GETCOORD, 25, 30) else: self.sock.send(self.outgoing) m_in = self.sock.recv(256) # print("got ", m_in) if (len(m_in) > 5 and m_in[0] == Commands.CMD_GETCOORD.value): x = (m_in[3] << 8) + m_in[4] y = (m_in[5] << 8) + m_in[2] self.xEntry.delete(0, END) self.xEntry.insert(0, str(x)) self.yEntry.delete(0, END) self.yEntry.insert(0, str(y)) # print(str(y)) self.outgoing = makeCommandBytes(Commands.CMD_OK, 25, 30) self.after(250, self.heartbeat) def replaceX(self, x): self.xEntry.delete(0, END) self.xEntry.insert(0, str(x)) def replaceY(self, y): self.yEntry.delete(0, END) self.yEntry.insert(0, str(y)) def doSet(self): self.sock.send(makeCommandBytes( Commands.CMD_SETCOORD, int(self.xEntry.get()), int(self.yEntry.get()))) self.actualMessage = True def tiltCCW(self): self.sock.send(makeCommandBytes( Commands.CMD_SETCOORD, int(self.xEntry.get()), int(self.yEntry.get()) - jDist)) self.replaceY(int(self.yEntry.get()) - jDist) self.actualMessage = True def tiltCW(self): self.sock.send(makeCommandBytes( Commands.CMD_SETCOORD, int(self.xEntry.get()), int(self.yEntry.get()) + jDist)) self.replaceY(int(self.yEntry.get()) + jDist) self.actualMessage = True def panCCW(self): self.sock.send(makeCommandBytes( Commands.CMD_SETCOORD, int(self.xEntry.get()) - jDist, int(self.yEntry.get()))) self.replaceX(int(self.xEntry.get()) - jDist) self.actualMessage = True def panCW(self): self.sock.send(makeCommandBytes( Commands.CMD_SETCOORD, int(self.xEntry.get()) + jDist, int(self.yEntry.get()))) self.replaceX(int(self.xEntry.get()) + jDist) self.actualMessage = True def auto(self): self.sock.send(makeCommandBytes(Commands.CMD_SETAUTO, 0, 0)) def doHome(self): self.sock.send(makeCommandBytes(Commands.CMD_HOME, 0, 0)) self.replaceX(5120) self.replaceY(5120) self.actualMessage = True def main(): top = Tk() app = SerialApp(master=top) top.mainloop() quit() if (__name__ == "__main__"): main() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
#!/usr/bin/env python3 import socket import threading # Import socket module from ptz_controller import PTZ, Commands, makeCommandBytes import time import base64 import cv2 import zmq from video import Detection class VCThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) context = zmq.Context() self.imgsocket = context.socket(zmq.PUB) self.imgsocket.bind('tcp://*:5002') self.detector = Detection() self.detector.start() self.camera = cv2.VideoCapture(0) self.go = True class VCException(Exception): pass def run(self): while(self.go): ret, frame = self.camera.read() self.detector.setFrame(frame) boxes = self.detector.getCurrentBoxes() if (thePTZ.Auto == True): self.detector.AutoUpdate(thePTZ) if (boxes.size > 0): cv2.rectangle( frame, (boxes[0][0], boxes[0, 1]), (boxes[0][2], boxes[0][3]), (255, 0, 0), 10) frame = cv2.resize(frame, (640, 480)) encoded, buf = cv2.imencode('.jpg', frame) # cv2.imshow("frame", frame) # cv2.waitKey(1) image = base64.b64encode(buf) self.imgsocket.send(image) self.camera.release() cv2.destroyAllWindows() def stopThread(self): self.go = False class ClientThread(threading.Thread): def __init__(self, clientsocket, clientAddress, isPrimary): threading.Thread.__init__(self) self.csocket = clientsocket self.caddr = clientAddress print("New connection added: ", clientAddress) self.runAuto = True def run(self): global primary_c while True: try: self.csocket.send(bytes([Commands.CMD_OK.value])) time.sleep(0.25) data = self.csocket.recv(2048) except socket.timeout: self.csocket.close() print("Timeout") primary_c = False break except BrokenPipeError: print("Broken Pipe") primary_c = False break except ConnectionResetError: print("Connection Reset") primary_c = False break result = self.parseCommand(data) if (result == 0): break if (result == Commands.CMD_NOPE.value): print("nope") print("Client at ", self.caddr, " disconnected...") def parseCommand(self, m_in): global thePTZ error = False end = len(m_in) if (end < 1): error = True elif (m_in[0] == Commands.CMD_OK.value): pass # print("Got ok") elif (m_in[0] == Commands.CMD_HOME.value): print("Home") thePTZ.command(Commands.CMD_HOME) elif (m_in[0] == Commands.CMD_SETAUTO.value): thePTZ.command(Commands.CMD_SETAUTO) elif (m_in[0] == Commands.CMD_SETCOORD.value): if (end < 6): error = True else: x = (m_in[3] << 8) + m_in[2] y = (m_in[5] << 8) + m_in[4] print("Move to "+str(x)+", "+str(y)) thePTZ.command(Commands.CMD_SETCOORD, x, y) elif (m_in[0] == Commands.CMD_GETCOORD.value): toSend = makeCommandBytes( Commands.CMD_GETCOORD, thePTZ.pan, thePTZ.tilt) print(f'Getcoord with pan = {thePTZ.pan: d}. ') try: self.csocket.send(toSend) except socket.timeout: self.csocket.close() print("Timeout") primary_c = False return 0 except BrokenPipeError: print("Broken Pipe") primary_c = False return 0 except ConnectionResetError: print("Connection Reset") primary_c = False return 0 return Commands.CMD_NOPE.value if error else Commands.CMD_DONE.value s = socket.socket() s.bind(('', 5000)) s.listen(5) primary_c = False thePTZ = PTZ("/dev/ttyACM0") videoThread = VCThread() videoThread.start() c = '' try: while True: c, addr = s.accept() # Establish connection with client. c.settimeout(2.0) if (not primary_c): newthread = ClientThread(c, addr, True) primary_c = True newthread.start() else: print('Got connection from', addr) c.send(b'Busy') c.close() # Close the connection except KeyboardInterrupt: if (c != ''): c.close() s.close() videoThread.stopThread() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
import serial from enum import Enum from time import sleep def makeCommandBytes(cmd, x=0, y=0): if cmd == Commands.CMD_SETCOORD or cmd == Commands.CMD_GETCOORD: toExec = bytes([cmd.value, 0, x & 0xff, (x & 0xff00) >> 8, y & 0xff, (y & 0xff00) >> 8, Commands.CMD_END.value]) #print("Sending a setcoord") else: toExec = bytes([cmd.value, Commands.CMD_END.value]) return toExec class PTZ: def __init__(self, port): self.ser = serial.Serial( port=port, baudrate=115200 ) self.pan = 5120 self.tilt = 5120 sleep(3) cmd = bytes([Commands.CMD_GIVE_HWINFO.value, Commands.CMD_END.value]) self.Auto = True self.ser.reset_input_buffer() #print(list(map(lambda x:hex(x), cmd))) self.ser.write(cmd) rec = self.ser.read(2) #print(hex(rec[0]), hex(rec[1])) if(rec == cmd): # sleep(3) self.ser.write(bytes([Commands.CMD_OK.value])) info = self.ser.read(3) print(f"Version {info[0]:d} with {info[1]:d} axes") if(info[2] == Commands.CMD_DONE.value): self.status = "OK" else: self.status = "FAIL" # self.home() if (False): for _ in range(0, 20): self.command(Commands.CMD_JOG_PAN_CCW) self.command(Commands.CMD_JOG_TILT_CCW) self.command(Commands.CMD_JOG_PAN_CW) self.command(Commands.CMD_JOG_TILT_CW) self.command(Commands.CMD_JOG_PAN_CW) self.command(Commands.CMD_JOG_TILT_CW) self.command(Commands.CMD_JOG_PAN_CCW) self.command(Commands.CMD_JOG_TILT_CCW) def command(self, cmd, x=0, y=0): x = x if x <= 10240 and x >= 0 else 10240 if x > 10240 else 0 y = y if y <= 10240 and y >= 0 else 10240 if y > 10240 else 0 if (cmd == Commands.CMD_SETAUTO): self.Auto = not self.Auto return if (cmd == Commands.CMD_SETCOORD): self.pan = x self.tilt = y toExec = makeCommandBytes(cmd, x, y) self.ser.write(toExec) rec = self.ser.read(len(toExec)) #print("Wrote " + str(toExec)) #print("Got " + str(rec)) while (rec != toExec): self.ser.reset_input_buffer() sleep(1) self.ser.write(toExec) rec = self.ser.read(2) #print("Wrote " + str(toExec)) #print("Got " + str(rec)) self.ser.write(bytes([Commands.CMD_OK.value])) rec = self.ser.read(1) assert rec[0] == Commands.CMD_DONE.value, \ "Wanted "+hex(Commands.CMD_DONE.value)+", Got "+hex(rec[0]) def home(self): self.command(Commands.CMD_HOME) #self.command(Commands.CMD_SETCOORD, 5120, 5120) self.pan = 5120 self.tilt = 5120 class Commands(Enum): CMD_OK = 0xAA CMD_END = 0xAF CMD_HOME = 0xBF CMD_JOG_PAN_CCW = 0x1A CMD_JOG_TILT_CCW = 0x1B CMD_JOG_PAN_CW = 0x2A CMD_JOG_TILT_CW = 0x2B CMD_GIVE_HWINFO = 0xD3 CMD_SETCOORD = 0x3A CMD_GETCOORD = 0x3B CMD_DONE = 0xFA CMD_NOPE = 0xFC CMD_SETAUTO = 0xFE |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
#!/usr/bin/env python3 import numpy as np import cv2 import threading import socket import time import zmq import base64 from ptz_controller import Commands jDist = 40 # code based on thedatafrog.com/en/articles/human-detection-video/ # thread class used to run video in seperate thread class VCThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) context = zmq.Context() self.imgsocket = context.socket(zmq.PUB) self.imgsocket.bind('tcp://*:5002') self.camera = cv2.VideoCapture(0) self.go = True self.detector = Detection() self.detector.start() class VCException(Exception): pass def run(self): while(self.go): ret, frame = self.camera.read() self.detector.setFrame(frame) boxes = self.detector.getCurrentBoxes() if (boxes.size > 0): cv2.rectangle( frame, (boxes[0][0], boxes[0, 1]), (boxes[0][2], boxes[0][3]), (255, 0, 0), 10) frame = cv2.resize(frame, (640, 480)) #gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) encoded, buf = cv2.imencode('.jpg', frame) # cv2.imshow("frame", frame) # cv2.waitKey(1) image = base64.b64encode(buf) self.imgsocket.send(image) if (thePTZ.Auto == True): self.detector.AutoUpdate() self.camera.release() cv2.destroyAllWindows() def stopThread(self): self.go = False class Detection(threading.Thread): def __init__(self): # init HOG descriptor/person detection. HOG is Histogram of # Oriented Gradients. It is used for image detection. In our case # It will be used for people detection. threading.Thread.__init__(self) self.hog = cv2.HOGDescriptor() self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector()) self.frame = False self.newFrame = False self.boxes = np.array([]) def run(self): while(True): if(not self.newFrame): time.sleep(0.1) else: self.boxes = self.detect(self.frame) self.newFrame = False def setFrame(self, frame): if (not self.newFrame): self.frame = frame self.newFrame = True def getCurrentBoxes(self): return np.copy(self.boxes) def detect(self, frame): # use greyscale for faster detection gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) # detect people in the image # returns the bounding boxes for the detected objects boxes, weights = self.hog.detectMultiScale(gray, winStride=(8, 8)) # print(boxes) boxes = np.array([[x, y, x+w, y+y] for (x, y, w, h) in boxes]) return boxes def AutoUpdate(self, thePTZ): # average values avg_x = [] avg_y = [] new_x = 0 new_y = 0 if (self.boxes.size == 0): return # loop through boxes (person would have multiple boxes. Could be problematic with multiple people on screen) for i in self.boxes: # average x coordinates avg_x.append((i[0] + i[2]) / 2) avg_y.append((i[1] + i[3]) / 2) for x in avg_x: new_x = new_x + x if (len(avg_x) > 0): new_x = new_x / len(avg_x) for y in avg_y: new_y = new_y + y if (len(avg_y) > 0): new_x = new_y / len(avg_y) framex = self.frame.shape[1] framey = self.frame.shape[0] #print(f"Frame is {framex:5d} x {framey:5d}") # compute difference between camera center and bounding box center xdif = framex/2 - new_x ydif = framey/2 - new_y dx = 0 dy = 0 if (abs(xdif) > jDist): # if person to the right if (xdif < 0): # move right dx = jDist else: # move left dx = -jDist # if ydif worth moving if (abs(ydif) > jDist): # if person is up if (ydif < 0): # look up dy = -jDist else: # Look down dy = jDist #print("Move to "+str(x)+", "+str(y)) thePTZ.command(Commands.CMD_SETCOORD, thePTZ.pan + dx, thePTZ.tilt + dy) |